import findspark
findspark.init()
findspark.find()
'C:\\Users\\hag8665\\Anaconda3\\envs\\new_environment\\lib\\site-packages\\pyspark'
# !pip install seaborn
# !pip install sklearn
# !pip install folio
# !pip install folium
import pandas as pd
import numpy as np
import seaborn as sns
import pyspark
from pyspark import SparkContext, SparkConf,StorageLevel,SQLContext
from platform import python_version
print(python_version())
3.6.13
from collections import namedtuple
from pyspark.sql import SparkSession
# sc = SparkContext.getOrCreate()
# sqlContext = SQLContext(sc)
spark = SparkSession.builder.appName("MLR").getOrCreate()
# dowload the data from github and save the file to folder
airbnbdf= pd.read_parquet('https://github.com/databricks/LearningSparkV2/raw/master/databricks-datasets/learning-spark-v2/sf-airbnb/sf-airbnb-clean.parquet/part-00000-tid-4320459746949313749-5c3d407c-c844-4016-97ad-2edec446aa62-6688-1-c000.snappy.parquet')
airbnbdf.to_parquet('data/airbnb.parquet')
<Response [200]>
filePath = r"C:\Users\hag8665\Desktop\MSDS 436\Assignment4\data\airbnb.parquet"
airbnbDF = spark.read.parquet(filePath)
airbnbDF.columns
['host_is_superhost', 'cancellation_policy', 'instant_bookable', 'host_total_listings_count', 'neighbourhood_cleansed', 'latitude', 'longitude', 'property_type', 'room_type', 'accommodates', 'bathrooms', 'bedrooms', 'beds', 'bed_type', 'minimum_nights', 'number_of_reviews', 'review_scores_rating', 'review_scores_accuracy', 'review_scores_cleanliness', 'review_scores_checkin', 'review_scores_communication', 'review_scores_location', 'review_scores_value', 'price', 'bedrooms_na', 'bathrooms_na', 'beds_na', 'review_scores_rating_na', 'review_scores_accuracy_na', 'review_scores_cleanliness_na', 'review_scores_checkin_na', 'review_scores_communication_na', 'review_scores_location_na', 'review_scores_value_na']
airbnbDF.select("neighbourhood_cleansed", "room_type", "bedrooms", "bathrooms",
"number_of_reviews", "price").show(5)
+----------------------+---------------+--------+---------+-----------------+-----+ |neighbourhood_cleansed| room_type|bedrooms|bathrooms|number_of_reviews|price| +----------------------+---------------+--------+---------+-----------------+-----+ | Western Addition|Entire home/apt| 1.0| 1.0| 180.0|170.0| | Bernal Heights|Entire home/apt| 2.0| 1.0| 111.0|235.0| | Haight Ashbury| Private room| 1.0| 4.0| 17.0| 65.0| | Haight Ashbury| Private room| 1.0| 4.0| 8.0| 65.0| | Western Addition|Entire home/apt| 2.0| 1.5| 27.0|785.0| +----------------------+---------------+--------+---------+-----------------+-----+ only showing top 5 rows
type(airbnbDF)
pyspark.sql.dataframe.DataFrame
# get data info
airbnbDF.printSchema()
root |-- host_is_superhost: string (nullable = true) |-- cancellation_policy: string (nullable = true) |-- instant_bookable: string (nullable = true) |-- host_total_listings_count: double (nullable = true) |-- neighbourhood_cleansed: string (nullable = true) |-- latitude: double (nullable = true) |-- longitude: double (nullable = true) |-- property_type: string (nullable = true) |-- room_type: string (nullable = true) |-- accommodates: double (nullable = true) |-- bathrooms: double (nullable = true) |-- bedrooms: double (nullable = true) |-- beds: double (nullable = true) |-- bed_type: string (nullable = true) |-- minimum_nights: double (nullable = true) |-- number_of_reviews: double (nullable = true) |-- review_scores_rating: double (nullable = true) |-- review_scores_accuracy: double (nullable = true) |-- review_scores_cleanliness: double (nullable = true) |-- review_scores_checkin: double (nullable = true) |-- review_scores_communication: double (nullable = true) |-- review_scores_location: double (nullable = true) |-- review_scores_value: double (nullable = true) |-- price: double (nullable = true) |-- bedrooms_na: double (nullable = true) |-- bathrooms_na: double (nullable = true) |-- beds_na: double (nullable = true) |-- review_scores_rating_na: double (nullable = true) |-- review_scores_accuracy_na: double (nullable = true) |-- review_scores_cleanliness_na: double (nullable = true) |-- review_scores_checkin_na: double (nullable = true) |-- review_scores_communication_na: double (nullable = true) |-- review_scores_location_na: double (nullable = true) |-- review_scores_value_na: double (nullable = true)
get_numericcol = airbnbDF.dtypes
#get all columns
col=[]
for i in get_numericcol:
col.append(i[0])
# drop columns with na
airbnbDF = airbnbDF.select(col[:-10])
get_numericcol = airbnbDF.dtypes
numcol=[]
for i in get_numericcol:
if i[1] != 'string':
numcol.append(i[0])
# summary of key numeirca columns
airbnbDF.select("neighbourhood_cleansed", "room_type", "bedrooms", "bathrooms",
"number_of_reviews", "price").describe().show()
+-------+----------------------+---------------+------------------+------------------+-----------------+------------------+ |summary|neighbourhood_cleansed| room_type| bedrooms| bathrooms|number_of_reviews| price| +-------+----------------------+---------------+------------------+------------------+-----------------+------------------+ | count| 7146| 7146| 7146| 7146| 7146| 7146| | mean| null| null|1.3427092079485026|1.3280856423173804|43.55387629443045|213.30982367758187| | stddev| null| null|0.9328545736962833|0.7937873897311164|72.53848066174737| 311.3754991591505| | min| Bayview|Entire home/apt| 0.0| 0.0| 0.0| 10.0| | max| Western Addition| Shared room| 14.0| 14.0| 677.0| 10000.0| +-------+----------------------+---------------+------------------+------------------+-----------------+------------------+
import matplotlib.pyplot as plt
import seaborn as sns
from pyspark.mllib.stat import Statistics
# result can be used w/ seaborn's heatmap
def compute_correlation_matrix(df, method='pearson'):
# wrapper around
# https://forums.databricks.com/questions/3092/how-to-calculate-correlation-matrix-with-all-colum.html
df_rdd = df.rdd.map(lambda row: row[0:])
corr_mat = Statistics.corr(df_rdd, method=method)
corr_mat_df = pd.DataFrame(corr_mat,
columns=df.columns,
index=df.columns)
return corr_mat_df
corr = compute_correlation_matrix(airbnbDF.select(numcol))
# Generate a mask for the upper triangle
mask = np.triu(np.ones_like(corr, dtype=bool))
# Generate a custom diverging colormap
cmap = sns.diverging_palette(230, 20, as_cmap=True)
# Set up the matplotlib figure
f, ax = plt.subplots(figsize=(11, 9))
sns.heatmap(corr, cmap=cmap, vmax=0.2, center=0,
square=True, linewidths=.5, cbar_kws={"shrink": 1}).set(title='Correlation Plot numeric columns')
[Text(0.5, 1.0, 'Correlation Plot numeric columns')]
import plotly.express as px
# price distribution plot
px.histogram(airbnbDF.toPandas()['price'])
from pyspark.sql.functions import col
import pyspark.sql.functions as F
airbnbDF = airbnbDF.withColumn("logprice", F.log(col("price")))
# log price distribution plot
px.histogram(airbnbDF.toPandas()['logprice'])
trainDF, testDF = airbnbDF.randomSplit([.8, .2], seed=42)
print(f"""There are {trainDF.count()} rows in the training set,
and {testDF.count()} in the test set""")
There are 5780 rows in the training set, and 1366 in the test set
from pyspark.ml.feature import VectorAssembler
vecAssembler = VectorAssembler(inputCols=["accommodates","bedrooms","bathrooms","beds"], outputCol="features")
vecTrainDF = vecAssembler.transform(trainDF)
vecTrainDF.select("bedrooms", "features", "logprice").show(10)
+--------+-----------------+-----------------+ |bedrooms| features| logprice| +--------+-----------------+-----------------+ | 1.0|[2.0,1.0,1.0,1.0]|5.298317366548036| | 1.0|[3.0,1.0,1.0,1.0]|4.867534450455582| | 1.0|[4.0,1.0,1.0,3.0]|4.553876891600541| | 1.0|[2.0,1.0,1.0,1.0]|5.521460917862246| | 3.0|[6.0,3.0,3.0,3.0]|5.521460917862246| | 1.0|[2.0,1.0,1.0,1.0]| 4.74493212836325| | 1.0|[2.0,1.0,1.5,1.0]|4.653960350157523| | 1.0|[2.0,1.0,1.0,2.0]|4.454347296253507| | 1.0|[4.0,1.0,1.0,2.0]|4.605170185988092| | 2.0|[4.0,2.0,1.0,2.0]|5.393627546352362| +--------+-----------------+-----------------+ only showing top 10 rows
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(featuresCol="features", labelCol="logprice")
lrModel = lr.fit(vecTrainDF)
# create pipline for test data
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[vecAssembler, lr])
pipelineModel = pipeline.fit(trainDF)
predDF = pipelineModel.transform(testDF)
predDF.select("bedrooms", "features", "logprice", "prediction").show(10)
+--------+-----------------+------------------+------------------+ |bedrooms| features| logprice| prediction| +--------+-----------------+------------------+------------------+ | 1.0|[2.0,1.0,1.0,1.0]| 4.442651256490317| 4.857224167081266| | 1.0|[1.0,1.0,1.0,1.0]|3.8066624897703196| 4.658280425916112| | 1.0|[1.0,1.0,1.0,1.0]| 4.248495242049359| 4.658280425916112| | 1.0|[1.0,1.0,1.0,1.0]| 4.852030263919617| 4.658280425916112| | 1.0|[1.0,1.0,1.0,1.0]|5.0689042022202315| 4.658280425916112| | 2.0|[3.0,2.0,1.0,2.0]| 5.521460917862246|5.1702148777184584| | 1.0|[2.0,1.0,1.0,1.0]| 4.59511985013459| 4.857224167081266| | 1.0|[3.0,1.0,1.0,3.0]| 4.553876891600541| 4.842334365115483| | 1.0|[2.0,1.0,1.0,1.0]| 4.605170185988092| 4.857224167081266| | 1.0|[2.0,1.0,1.0,1.0]| 7.605890001053122| 4.857224167081266| +--------+-----------------+------------------+------------------+ only showing top 10 rows
from pyspark.ml.evaluation import RegressionEvaluator
regressionEvaluator = RegressionEvaluator(
predictionCol="prediction",
labelCol="logprice",
metricName="rmse")
rmse = regressionEvaluator.evaluate(predDF)
print(f"RMSE is {rmse:.1f}")
RMSE is 0.5
r2 = regressionEvaluator.setMetricName("r2").evaluate(predDF)
print(f"R2 is {r2}")
R2 is 0.3849199059028645
sns.residplot(predDF.toPandas()['prediction'], (predDF.toPandas()['logprice']-predDF.toPandas()['prediction']) )\
.set(title='Residual Plot')
[Text(0.5, 1.0, 'Residual Plot')]
from pyspark.ml.feature import OneHotEncoder, StringIndexer
categoricalCols = [field for (field, dataType) in trainDF.dtypes if dataType == "string"]
indexOutputCols = [x + "Index" for x in categoricalCols]
oheOutputCols = [x + "OHE" for x in categoricalCols]
stringIndexer = StringIndexer(inputCols=categoricalCols,outputCols=indexOutputCols,handleInvalid="skip")
oheEncoder = OneHotEncoder(inputCols=indexOutputCols,outputCols=oheOutputCols)
numericCols = [field for (field, dataType) in trainDF.dtypes if ((dataType == "double") & (field != "price") & (field != "logprice"))]
assemblerInputs = oheOutputCols + numericCols
vecAssembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
lr = LinearRegression(labelCol="logprice", featuresCol="features")
pipeline = Pipeline(stages = [stringIndexer, oheEncoder, vecAssembler, lr])
pipelineModel = pipeline.fit(trainDF)
predDF = pipelineModel.transform(testDF)
predDF.select("features", "logprice", "prediction").show(5)
+--------------------+------------------+------------------+ | features| logprice| prediction| +--------------------+------------------+------------------+ |(88,[0,3,6,22,43,...| 4.442651256490317|4.6476980367850445| |(88,[0,3,6,22,43,...|3.8066624897703196| 4.217924994991279| |(88,[0,3,6,22,43,...| 4.248495242049359| 4.242783979052859| |(88,[0,3,6,12,42,...| 4.852030263919617|3.9018974162231075| |(88,[0,3,6,12,43,...|5.0689042022202315|4.6103524241670755| +--------------------+------------------+------------------+ only showing top 5 rows
from pyspark.ml.evaluation import RegressionEvaluator
regressionEvaluator = RegressionEvaluator(
predictionCol="prediction",
labelCol="logprice",
metricName="rmse")
rmse = regressionEvaluator.evaluate(predDF)
print(f"RMSE is {rmse:.1f}")
RMSE is 0.4
r2 = regressionEvaluator.setMetricName("r2").evaluate(predDF)
print(f"R2 is {r2}")
R2 is 0.5675727853839072
sns.residplot(predDF.toPandas()['prediction'], (predDF.toPandas()['logprice']-predDF.toPandas()['prediction']) )\
.set(title='Residual Plot')
[Text(0.5, 1.0, 'Residual Plot')]